/*******************************************************************************

  Pilot Intelligence Library
    http://www.pilotintelligence.com/

  ----------------------------------------------------------------------------

    This program is free software: you can redistribute it and/or modify
    it under the terms of the GNU General Public License as published by
    the Free Software Foundation, either version 3 of the License, or
    any later version.

    This program is distributed in the hope that it will be useful,
    but WITHOUT ANY WARRANTY; without even the implied warranty of
    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
    GNU General Public License for more details.

    You should have received a copy of the GNU General Public License
    along with this program. If not, see <http://www.gnu.org/licenses/>.

*******************************************************************************/

#include <stdint.h>

#include <set>
#include <deque>

#include "base/utils/utils.h"
#include "base/utils/crc.h"

#include "NetTransfer_UDP.h"

namespace pi {

////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////

///
/// \ref
///
/// UDP buffer size:
///         Linux           131071
///         Windows         No known limit
///         Solaris         262144
///         FreeBSD, Darwin	262144
///         AIX             1048576
///
///     https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Web_Platform/5/html/Administration_And_Configuration_Guide/jgroups-perf-udpbuffer.html
///
/// how to change kernel UDP receiving buffer size
///     http://lcm.googlecode.com/svn-history/r452/www/reference/lcm/multicast-setup.html#table:kernel-udp-buffer-resizing
///
/// UDP package lost:
///     netstat -su
///
///     http://www.cnblogs.com/mengyan/archive/2012/10/04/2711340.html
///     http://blog.sina.com.cn/s/blog_a459dcf5010153mp.html
///


#define NETTRANSFER_PACKET_HEADER_SIZE      32
#define NETTRANSFER_PACKET_DATASIZE         16384
#define NETTRANSFER_PACKET_MAXSIZE          (NETTRANSFER_PACKET_HEADER_SIZE*2+NETTRANSFER_PACKET_DATASIZE)

#define NETTRANSFER_PACKET_MAGICNUM         0xA55AA55A



////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////

///
/// \brief The NetTransfer Packet class, this class is used to represent the final
///         transfer data data packet
///
class NetTransfer_Packet
{
public:
    NetTransfer_Packet() :
        magicNum(NETTRANSFER_PACKET_MAGICNUM),
        msgID(0), msgLen(0),
        packN(0), packIdx(0), packLen(0),
        crcHeader(0), crcData(0),
        m_len(0), m_data(0) {}

    NetTransfer_Packet(uint8_t* dat, int len) {
        m_len = len;
        m_data = dat;

        parse(dat, len, 0);
    }

    ~NetTransfer_Packet() {
        free();
    }


    ///
    /// \brief parse input data to packet
    ///
    /// \param dat              - data buffer
    /// \param len              - buffer length
    /// \param doCopy
    ///
    /// \return
    ///
    int parse(uint8_t* dat, int len, int doCopy=1) {
        // copy data
        if( doCopy ) {
            malloc(len);
            memcpy(m_data, dat, len);
        }

        // pack size is less then header size
        if( len < NETTRANSFER_PACKET_HEADER_SIZE ) return -1;

        // parse header data
        uint32_t *p = (uint32_t*) m_data;

        magicNum    = convByteOrder_n2h(*(p++));
        msgID       = convByteOrder_n2h(*(p++));
        msgLen      = convByteOrder_n2h(*(p++));
        packN       = convByteOrder_n2h(*(p++));
        packIdx     = convByteOrder_n2h(*(p++));
        packLen     = convByteOrder_n2h(*(p++));
        crcHeader   = convByteOrder_n2h(*(p++));
        crcData     = convByteOrder_n2h(*(p++));

        // check header CRC
        uint32_t crcH = crc32(&magicNum, 24);
        if( crcH != crcHeader ) {
            dbg_pe("Header CRC error!");
            return -1;
        }

        // check data length
        if( len < packLen + NETTRANSFER_PACKET_HEADER_SIZE ) {
            dbg_pe("Input data length wrong!");
            return -2;
        }

        // check data CRC
        uint32_t crcD = crc32(m_data+NETTRANSFER_PACKET_HEADER_SIZE, packLen);
        if( crcD != crcData ) {
            dbg_pe("Data CRC error!");
            return -3;
        }

        return 0;
    }


    ///
    /// \brief generate packet
    ///
    /// \param dat
    /// \param dataSize
    /// \param mID
    /// \param pN
    /// \param pIdx
    ///
    /// \return
    ///
    int genPacket(uint8_t* dat, int datSize,
                  int mID,
                  int pN, int pIdx) {
        // determin packLen
        uint32_t pLen;
        uint8_t* pd;

        if( datSize > NETTRANSFER_PACKET_DATASIZE ) {
            uint32_t lenRem = datSize - pIdx*NETTRANSFER_PACKET_DATASIZE;

            pLen = NETTRANSFER_PACKET_DATASIZE;
            if( lenRem <= NETTRANSFER_PACKET_DATASIZE ) pLen = lenRem;

            pd = dat + pIdx*NETTRANSFER_PACKET_DATASIZE;
        } else {
            pLen = datSize;
            pd = dat;
        }

        m_len = pLen + NETTRANSFER_PACKET_HEADER_SIZE;
        malloc(m_len);
        memcpy(m_data+NETTRANSFER_PACKET_HEADER_SIZE, pd, pLen);

        magicNum    = NETTRANSFER_PACKET_MAGICNUM;
        msgID       = mID;
        msgLen      = datSize;
        packN       = pN;
        packIdx     = pIdx;
        packLen     = pLen;
        crcHeader   = crc32(&magicNum, 24);
        crcData     = crc32(dat, datSize);

        // generate header data
        uint32_t *p = (uint32_t*) m_data;
        *(p++)      = convByteOrder_h2n(magicNum);
        *(p++)      = convByteOrder_h2n(msgID);
        *(p++)      = convByteOrder_h2n(msgLen);
        *(p++)      = convByteOrder_h2n(packN);
        *(p++)      = convByteOrder_h2n(packIdx);
        *(p++)      = convByteOrder_h2n(packLen);
        *(p++)      = convByteOrder_h2n(crcHeader);
        *(p++)      = convByteOrder_h2n(crcData);

        return 0;
    }


    uint8_t* getData(void) {
        if( m_len > NETTRANSFER_PACKET_HEADER_SIZE && m_data ) {
            return m_data + NETTRANSFER_PACKET_HEADER_SIZE;
        }

        return NULL;
    }

protected:
    int malloc(int dataLen) {
        if( m_data ) delete [] m_data;

        m_len = dataLen;
        m_data = new uint8_t[m_len];
    }

    int free(void) {
        if( m_data ) delete [] m_data;

        m_len = 0;
        m_data = NULL;
    }


public:
    uint32_t            magicNum;                       ///< magic number
    uint32_t            msgID;                          ///< message id
    uint32_t            msgLen;                         ///< message total size
    uint32_t            packN;                          ///< packet size
    uint32_t            packIdx;                        ///< current packet index
    uint32_t            packLen;                        ///< data length of current packet
    uint32_t            crcHeader;                      ///< CRC of header
    uint32_t            crcData;                        ///< CRC of data

protected:
    int                 m_len;                          ///< data buffer length
                                                        ///<    (total size including header)
    uint8_t             *m_data;                        ///< all data buffer
};


///
/// \brief The NetTransfer Message class, the message is user send/recv data
///
class NetTransfer_Message
{
public:
    NetTransfer_Message() {}
};



////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////

///
/// \brief The packet for sending
///
class NetTransfer_Packet_Send
{
public:
    NetTransfer_Packet_Send() : m_len(0), m_dat(NULL) {}

    NetTransfer_Packet_Send(const NetTransfer_Packet_Send& ref) {
        m_len = ref.m_len;

        int ret = Packet_malloc(m_len);
        if(ret == 0)
            memcpy(m_dat,ref.m_dat,ref.m_len);
    }

    ~NetTransfer_Packet_Send() {
        Packet_free();
    }

    int generatePacket(uint8_t *dat, uint32_t len,
                       uint32_t msgID, uint32_t packN, uint32_t packIdx,
                       int checkmode) {
        uint32_t lenOut;
        uint32_t packLen;

        // determin packLen
        if( len > NETTRANSFER_PACKET_DATASIZE ) {
            uint32_t lenRem = len - packIdx*NETTRANSFER_PACKET_DATASIZE;

            packLen = NETTRANSFER_PACKET_DATASIZE;
            if( lenRem <= NETTRANSFER_PACKET_DATASIZE ) packLen = lenRem;
        } else {
            packLen = len;
        }

        // output buffer size
        lenOut = NETTRANSFER_PACKET_HEADER_SIZE + packLen;

        // memory expand
        if( lenOut > m_len ) {
            Packet_free();
            Packet_malloc(lenOut);
        } else {
            m_len = lenOut;
        }

        // copy and calculate CRC
        uint8_t *pDat       = dat + packIdx*NETTRANSFER_PACKET_DATASIZE;
        uint8_t *pDatDst    = m_dat + NETTRANSFER_PACKET_HEADER_SIZE;
        memcpy(pDatDst, pDat, packLen);

        // generate header
        uint32_t msgHeader[8];

        msgHeader[0] = NETTRANSFER_PACKET_MAGICNUM;
        msgHeader[1] = msgID;
        msgHeader[2] = len;
        msgHeader[3] = packN;
        msgHeader[4] = packIdx;
        msgHeader[5] = packLen;
        msgHeader[6] = crc32(msgHeader, 24);
        msgHeader[7] = crc32(pDat, packLen);

        uint32_t *p = (uint32_t*) m_dat;
        for(int i=0; i<8; i++) {
            *(p++) = convByteOrder_h2n(msgHeader[i]);
        }

        m_msgID     = msgID;
        m_packN     = packN;
        m_packIdx   = packIdx;

        //printf("msgID = %6d, pack = %5d/%5d, len = %6d\n", msgID, packIdx, packN, lenOut);

        return lenOut;
    }

public:
    uint8_t                     *m_dat;
    uint32_t                     m_len;

    uint32_t                    m_msgID;
    uint32_t                    m_packN;
    uint32_t                    m_packIdx;

protected:
    int Packet_malloc(uint32_t _len) {
        if( m_dat ) delete [] m_dat;

        m_dat = new uint8_t[_len];
        m_len = _len;

        return 0;
    }

    int Packet_free(void) {
        if( m_dat ) delete [] m_dat;

        m_dat = NULL;
        m_len = 0;

        return 0;
    }
};


class NetTransfer_Packet_Recv
{
public:
    NetTransfer_Packet_Recv() {
        dat = NULL;
    }

    ~NetTransfer_Packet_Recv() {
        if( dat ) delete [] dat;
        dat = NULL;
    }

    int inputData(uint8_t *datBuf, uint32_t len, uint32_t packIdx) {
        timeStamp = tm_getTimeStamp();

        bitmap.insert(packIdx);

        uint8_t *p = dat + packIdx*NETTRANSFER_PACKET_DATASIZE;
        memcpy(p, datBuf, len);

        if( bitmap.size() == msg_packN )
            return 1;
        else
            return 0;
    }

    int malloc(void) {
        if( dat ) delete [] dat;

        dat = new uint8_t[NETTRANSFER_PACKET_DATASIZE*msg_packN];
        return 0;
    }

    int data_free(void) {
        if( dat ) delete [] dat;
        dat = NULL;

        bitmap.clear();

        return 0;
    }


public:
    uint8_t                     *dat;

    uint32_t                    msg_magicNum;
    uint32_t                    msg_id;
    uint32_t                    msg_len;
    uint32_t                    msg_packN;

    std::set<uint32_t>          bitmap;

    double                      timeStamp;                  ///< last receiving timeStamp
};



class NetTransfer_UDP::NetTransfer_PacketPool
{
public:
    typedef std::map<uint32_t, SPtr<NetTransfer_Packet_Recv> > PacketPool_RecvMap;

public:
    NetTransfer_PacketPool() : m_sendTotalSize(0) {
        m_recvBuff.clear();
    }
    ~NetTransfer_PacketPool() {}

    int recv_pushData(uint8_t *dat, uint32_t len, int checkMode) {
        uint32_t magicNum   = 0;
        uint32_t msgID      = 0;
        uint32_t msgSize    = 0;
        uint32_t packN      = 0;
        uint32_t packIdx    = 0;
        uint32_t packLen    = 0;
        uint32_t CRC_header = 0;
        uint32_t CRC_data   = 0;

        // parse received data
        int ret_parse = parsePacketHeader(dat, len,
                                          magicNum,
                                          msgID, msgSize, packN, packIdx, packLen,
                                          CRC_header, CRC_data,
                                          checkMode);

        /*
        printf("msgID = %6d, pack = %5d/%5d, len = %6d\n", msgID, packIdx, packN,
               packLen+NETTRANSFER_PACKET_HEADER_SIZE);
        */

        if( ret_parse < 0 ) {
            dbg_pe("error in parsePacketHeader! errCode = %d", ret_parse);
            return -1;
        }

        // get the packet in pool
        SPtr<NetTransfer_Packet_Recv> pack = SPtr<NetTransfer_Packet_Recv>(new NetTransfer_Packet_Recv);
        PacketPool_RecvMap::iterator it = m_recvBuff.find(msgID);

        if( it == m_recvBuff.end() ) {
            pack->msg_magicNum  = magicNum;
            pack->msg_id        = msgID;
            pack->msg_len       = msgSize;
            pack->msg_packN     = packN;

            pack->malloc();
        } else {
            pack = it->second;     // copy construct
        }

        // insert to packet
        int f = pack->inputData(dat+NETTRANSFER_PACKET_HEADER_SIZE, packLen, packIdx);
        if( f ) {
            // insert to packet queue
            {
                ScopedMutex m(m_recvMutex);
                m_recvPackQueue.push_back(pack);
            }

            if( it != m_recvBuff.end() ) m_recvBuff.erase(it);

            //m_recvSem.set();

            return 0;
        }

        // insert to recv buffer
        if (it == m_recvBuff.end() ) {
            m_recvBuff.insert(std::make_pair(msgID, pack));
        }

        return 0;
    }

    SPtr<NetTransfer_Packet_Recv> recv_getPack(void) {
        SPtr<NetTransfer_Packet_Recv> p;

        //m_recvSem.wait();

        ScopedMutex m(m_recvMutex);

        if( m_recvPackQueue.size() > 0 ) {
            p = m_recvPackQueue.front();
            m_recvPackQueue.pop_front();
        }

        return p;
    }

    int parsePacketHeader(uint8_t *dat, uint32_t len,
                          uint32_t &magicNum,
                          uint32_t &msgID,
                          uint32_t &msgSize,
                          uint32_t &packN,
                          uint32_t &packIdx,
                          uint32_t &packLen,
                          uint32_t &CRC_header,
                          uint32_t &CRC_data,
                          int checkmode)
    {
        // get header data
        uint32_t header[8];
        uint32_t *p = (uint32_t*) dat;

        for(int i=0; i<8; i++) {
            header[i] = convByteOrder_n2h(*(p++));
        }

        magicNum    = header[0];
        msgID       = header[1];
        msgSize     = header[2];
        packN       = header[3];
        packIdx     = header[4];
        packLen     = header[5];
        CRC_header  = header[6];
        CRC_data    = header[7];

        // check header CRC
        uint32_t cHeader = crc32(header, 24);
        if( cHeader != CRC_header ) return -1;

        // check packet length
        if( len < packLen + NETTRANSFER_PACKET_HEADER_SIZE ) return -2;

        // check data CRC
        uint32_t cData = crc32(dat+NETTRANSFER_PACKET_HEADER_SIZE, packLen);
        if( cData != CRC_data ) return -3;

        return 0;
    }

    int recv_getQueueSize() {
        return m_recvPackQueue.size();
    }

    int removeIncompletePack(void) {
        PacketPool_RecvMap::iterator it;

        double tNow = tm_getTimeStamp();
        double dtMax = 10.0;

        if( m_recvBuff.size() > 2 ) {
            for(it=m_recvBuff.begin(); it!=m_recvBuff.end(); it++) {
                SPtr<NetTransfer_Packet_Recv> p = it->second;

                if( p.get() ) {
                    double time_ = tNow - p->timeStamp;
                    if( time_ > dtMax ) {
                        dbg_pt("delete packet ID: %ld", p->msg_id);
                        m_recvBuff.erase(it);

                        break;
                    }
                }
            }
        }
    }

    int send_data(uint8_t *dat, int len, uint32_t msgID, int checkMode)
    {
        uint32_t    packN;
        uint32_t    packIdx;

        int         ret = -1;
        int         nSend = 0;

        packN  = len / NETTRANSFER_PACKET_DATASIZE + 1;

        for(packIdx=0; packIdx<packN; packIdx++) {
            SPtr<NetTransfer_Packet_Send> packet = SPtr<NetTransfer_Packet_Send>(new NetTransfer_Packet_Send);

            ret = packet->generatePacket(dat, len, msgID, packN, packIdx, checkMode);
            send_pushPack(packet);

            nSend += ret;
        }

        return nSend;
    }

    int send_pushPack(SPtr<NetTransfer_Packet_Send> &pack) {
        ScopedMutex m(m_sendMutex);

        m_sendTotalSize += pack->m_len;
        m_sendPackQueue.push_back(pack);

        return 0;
    }

    SPtr<NetTransfer_Packet_Send> send_popPack(void) {
        ScopedMutex m(m_sendMutex);

        SPtr<NetTransfer_Packet_Send> p;

        if( m_sendPackQueue.size() > 0 ) {
            p = m_sendPackQueue.front();
            m_sendTotalSize -= p->m_len;
            m_sendPackQueue.pop_front();
        }

        return p;
    }

    int send_getBufferSize() {  //in bytes
        ScopedMutex m(m_sendMutex);
        return m_sendTotalSize;
    }

    int send_getQueueSize() {
        ScopedMutex m(m_sendMutex);
        return m_sendPackQueue.size();
    }

    int reset(void) {
        m_recvBuff.clear();
        m_recvPackQueue.clear();

        m_sendPackQueue.clear();
    }

protected:
    PacketPool_RecvMap                          m_recvBuff;
    std::deque<SPtr<NetTransfer_Packet_Recv> >  m_recvPackQueue;

    Semaphore                                   m_recvSem;
    Mutex                                       m_recvMutex;


    std::deque<SPtr<NetTransfer_Packet_Send> >  m_sendPackQueue;
    uint64_t                                    m_sendTotalSize;
    Mutex                                       m_sendMutex;
};



////////////////////////////////////////////////////////////////////////////////
////////////////////////////////////////////////////////////////////////////////

NetTransfer_UDP::NetTransfer_UDP()
{
    pi::Thread::setName("NetTransfer_UDP");

    m_isServer      = 0;
    m_isConnected   = 0;
    m_port          = 30000;
    m_msgID         = 0;

    m_transferBPS   = svar.GetInt("NetTransfer_UDP.transferBPS", 10000);
    m_checkMode     = 0;

    m_packPool      = SPtr<NetTransfer_PacketPool>(new NetTransfer_PacketPool);
}

NetTransfer_UDP::~NetTransfer_UDP()
{
    close();
}

void NetTransfer_UDP::threadFunc()
{
    if( m_isServer )
        threadFunc_recv();
    else
        threadFunc_send();
}

void NetTransfer_UDP::threadFunc_send()
{
    int     ret = 0;
    int     sendInterval = 10;

    int     bSend = 0;
    double  t1, tNow, dt, tCheck = 0.05;
    double  tB, tE;

    t1 = tm_getTimeStamp();

    while( !shouldStop() ) {
        // get a packet from queue and send it
        if( m_packPool->send_getQueueSize() > 0 ) {
            SPtr<NetTransfer_Packet_Send> cur_packet = m_packPool->send_popPack();
            if( cur_packet.get() != NULL ) {

                tB = tm_getTimeStamp();
                ret = m_socket.send(cur_packet->m_dat, cur_packet->m_len);
                tE = tm_getTimeStamp();

                double ts = 1.0*ret / (1000.0*m_transferBPS);
                int su = sendInterval;
                if( tE - tB < ts ) su = (ts - (tE-tB))*1e6;
                if( su < sendInterval ) su = sendInterval;

                tm_sleep_us(su);
            }

            if( ret > 0 ) bSend += ret;
        } else {
            tm_sleep(1);
        }

        // keep send speed
        double tSend = 1.0*bSend / (1000.0 * m_transferBPS);
        if( tSend > tCheck ) {
            tNow = tm_getTimeStamp();
            dt = tNow - t1;

            if( dt < tSend ) {
                tm_sleep_us(1e6*(tSend - dt));
            }

            if( 0 ) {
                dbg_pt("dt = %f, tSend = %f, bSend = %d, bps = %f MBPS (%f, %f)",
                       dt, tSend, bSend,
                       1.0*bSend/dt/1e6, 1.0*m_transferBPS/1e3, 1.0*bSend/(tm_getTimeStamp()-t1)/1e6);
            }

            t1 = tm_getTimeStamp();
            bSend = 0;
        }
    }
}

void NetTransfer_UDP::threadFunc_recv()
{
    int     ret;

    uint8_t *buf;
    int     bufSize;

    double  t1, tNow;

    // create receiving buffer
    bufSize = NETTRANSFER_PACKET_MAXSIZE;
    buf = new uint8_t[bufSize];

    // loop until stop
    t1 = tm_getTimeStamp();

    while( !shouldStop() ) {
        timer.enter("NetTransfer_UDP::recvThread");

        timer.enter("NetTransfer_UDP::recv");
        ret = m_socket.recv(buf, bufSize);
        timer.leave("NetTransfer_UDP::recv");

        // put the received data to PacketPool
        if( ret < 0 ) break;
        else if( ret == 0 ) tm_sleep(1);
        else {
            timer.enter("NetTransfer_UDP::recv_pushData");
            m_packPool->recv_pushData(buf, ret, m_checkMode);
            timer.leave("NetTransfer_UDP::recv_pushData");
        }

        // remove incomplete package
        tNow = tm_getTimeStamp();
        if( tNow - t1 > 0.2 ) {
            m_packPool->removeIncompletePack();

            t1 = tNow;
        }
        timer.leave("NetTransfer_UDP::recvThread");
    }

    delete [] buf;
}


int NetTransfer_UDP::open(int isServer, int port, const std::string &addr, RSocketType st)
{
    int ret = -1;

    if( m_isConnected ) return ret;

    m_socketType    = st;
    m_isServer      = isServer;
    m_addr          = addr;
    m_port          = port;
    m_msgID         = 0;
    m_isConnected   = 0;

    if( m_socketType == SOCKET_UDP || m_socketType == SOCKET_UDP_MULTICAST ) {
        if( m_isServer ) {
            // start UDP server
            if( m_socketType == SOCKET_UDP )
                ret = m_socket.startServer(m_port, m_socketType);
            else
                ret = m_socket.startServer(m_addr, m_port, m_socketType);
        } else {
            // start UDP client
            ret = m_socket.startClient(m_addr, m_port, m_socketType);
        }
    }

    // start TX/RX thread
    if( ret == 0 ) {
        m_packPool->reset();
        m_msgID = 0;

        start();

        m_isConnected = 1;
    }

    return ret;
}

int NetTransfer_UDP::close(void)
{
    if( !m_isConnected ) return -1;

    // stop thread
    pi::Thread::stop();
    pi::Thread::tryJoin(10);

    // close socket
    m_socket.close();
    m_isConnected = 0;
    m_isServer = 0;

    return 0;
}

int NetTransfer_UDP::send(uint8_t *dat, int len)
{
    uint64_t    msgID;

    if( !m_isConnected ) return -1;
    if( len == 0 ) return 0;

    {
        ScopedMutex m(m_mutexSend);
        msgID = m_msgID++;
    }

    return m_packPool->send_data(dat, len, msgID, m_checkMode);
}

int NetTransfer_UDP::send(RDataStream &ds)
{
    return send(ds.data(), ds.size());
}

int NetTransfer_UDP::recv(uint8_t *dat, int len)
{
    if(m_isServer) return -1;

    SPtr<NetTransfer_Packet_Recv> p = m_packPool->recv_getPack();
    if( p.get() ) {
        uint32_t dl = p->msg_len;

        if( len < p->msg_len )
            memcpy(dat, p->dat, len);
        else
            memcpy(dat, p->dat, dl);

        return dl;
    }

    return -1;
}

int NetTransfer_UDP::recv(RDataStream &ds)
{
    ru32    ds_magic, ds_ver, ds_size;
    int     ret = -1;

    SPtr<NetTransfer_Packet_Recv> p = m_packPool->recv_getPack();
    if( p.get() ) {
        // read header
        datastream_get_header(p->dat, ds_magic, ds_ver);
        ds_size = datastream_get_length(p->dat);

        if( ds_size == p->msg_len ) ret = p->msg_len;

        ds.fromRawData(p->dat, p->msg_len, 0);

        // FIXME: this usage means transfer the p->data to DataStream
        p->dat = NULL;
    }

    return ret;
}

int NetTransfer_UDP::recvSlot(uint8_t *dat, int len)
{
    dbg_pw("Not implemented!");

    return 0;
}

int NetTransfer_UDP::getQueueSize()
{
    if( m_isServer ) {
        return m_packPool->recv_getQueueSize();
    } else {
        return m_packPool->send_getQueueSize();
    }
}

} // end of namespace pi

